Ovo je poslednji članak u seriji „Clean Code in Go“. Претходни делови: Чист код: функције и управљање грешкама у иду: од хаоса до јасноће [Део 1] Чист код у покрету (Део 2): Структура, методе и састав над наслеђивањем Чист код: интерфејси у ходу - зашто је мали леп [Део 3] Чист код у покрету (Део 4): Архитектура пакета, ток зависности и скалабилност Увод: Зашто је конкуренција посебна Дебугирао сам цурења горутина у 3 ујутру, фиксне услове трке који су се појавили само под оптерећењем, и гледао једну недостајућу Изјава доноси доле услугу производње. "Не комуницирајте дељењем меморије; делите меморију комуникацијом" - ова Го мантра претворила је истовремено програмирање на главу. Уместо мутекса и семафора - канала. Уместо жица - горутина. Уместо позива - изаберите. И све ово са контекстом за управљање животним циклусом. defer Уобичајене конкурентне грешке које сам наишао: Гороутин цурења: ~40% проблема са меморијом производње Услови трке са заједничким статусом: ~35% истовременог кода Недостаје укидање контекста: ~50% временских грешака Deadlocks od zloupotrebe kanala: ~25% od obesnih usluga Погрешна употреба мутекса (пријемник вредности): ~30% синхронизованих грешака Након 6 година рада са Го-ом и системима који обрађују милионе захтева, могу рећи: правилна употреба горутина и контекста је разлика између елегантног решења и инцидента у производњи у 3 сата. Tema: Upravljanje životnim ciklusom Прво правило контекста // RULE: context.Context is ALWAYS the first parameter func GetUser(ctx context.Context, userID string) (*User, error) { // correct } func GetUser(userID string, ctx context.Context) (*User, error) { // wrong - violates convention } отказивање // BAD: operation cannot be cancelled func SlowOperation() (Result, error) { time.Sleep(10 * time.Second) // always waits 10 seconds return Result{}, nil } // GOOD: operation respects context func SlowOperation(ctx context.Context) (Result, error) { select { case <-time.After(10 * time.Second): return Result{}, nil case <-ctx.Done(): return Result{}, ctx.Err() } } // Usage with timeout ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() result, err := SlowOperation(ctx) if err == context.DeadlineExceeded { log.Println("Operation timed out") } Kontekstne vrednosti: Koristite pažljivo! // BAD: using context for business logic type key string const userKey key = "user" func WithUser(ctx context.Context, user *User) context.Context { return context.WithValue(ctx, userKey, user) } func GetUser(ctx context.Context) *User { return ctx.Value(userKey).(*User) // panic if no user! } // GOOD: context only for request metadata type contextKey string const ( requestIDKey contextKey = "requestID" traceIDKey contextKey = "traceID" ) func WithRequestID(ctx context.Context, requestID string) context.Context { return context.WithValue(ctx, requestIDKey, requestID) } func GetRequestID(ctx context.Context) string { if id, ok := ctx.Value(requestIDKey).(string); ok { return id } return "" } // BETTER: explicit parameter passing func ProcessOrder(ctx context.Context, user *User, order *Order) error { // user passed explicitly, not through context return nil } Goroutines: Лагана конкуренција Ознака: worker pool // Worker pool to limit concurrency type WorkerPool struct { workers int jobs chan Job results chan Result wg sync.WaitGroup } type Job struct { ID int Data []byte } type Result struct { JobID int Output []byte Error error } func NewWorkerPool(workers int) *WorkerPool { return &WorkerPool{ workers: workers, jobs: make(chan Job, workers*2), results: make(chan Result, workers*2), } } func (p *WorkerPool) Start(ctx context.Context) { for i := 0; i < p.workers; i++ { p.wg.Add(1) go p.worker(ctx, i) } } func (p *WorkerPool) worker(ctx context.Context, id int) { defer p.wg.Done() for { select { case job, ok := <-p.jobs: if !ok { return } result := p.processJob(job) select { case p.results <- result: case <-ctx.Done(): return } case <-ctx.Done(): return } } } func (p *WorkerPool) processJob(job Job) Result { // Process job output := bytes.ToUpper(job.Data) return Result{ JobID: job.ID, Output: output, } } func (p *WorkerPool) Submit(job Job) { p.jobs <- job } func (p *WorkerPool) Shutdown() { close(p.jobs) p.wg.Wait() close(p.results) } // Usage func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() pool := NewWorkerPool(10) pool.Start(ctx) // Submit jobs for i := 0; i < 100; i++ { pool.Submit(Job{ ID: i, Data: []byte(fmt.Sprintf("job-%d", i)), }) } // Collect results go func() { for result := range pool.results { log.Printf("Result %d: %s", result.JobID, result.Output) } }() // Graceful shutdown pool.Shutdown() } Ознака: fan-out / fan-in // Fan-out: distribute work among goroutines func fanOut(ctx context.Context, in <-chan int, workers int) []<-chan int { outputs := make([]<-chan int, workers) for i := 0; i < workers; i++ { output := make(chan int) outputs[i] = output go func() { defer close(output) for { select { case n, ok := <-in: if !ok { return } // Heavy work result := n * n select { case output <- result: case <-ctx.Done(): return } case <-ctx.Done(): return } } }() } return outputs } // Fan-in: collect results from goroutines func fanIn(ctx context.Context, inputs ...<-chan int) <-chan int { output := make(chan int) var wg sync.WaitGroup for _, input := range inputs { wg.Add(1) go func(ch <-chan int) { defer wg.Done() for { select { case n, ok := <-ch: if !ok { return } select { case output <- n: case <-ctx.Done(): return } case <-ctx.Done(): return } } }(input) } go func() { wg.Wait() close(output) }() return output } // Usage func pipeline(ctx context.Context) { // Number generator numbers := make(chan int) go func() { defer close(numbers) for i := 1; i <= 100; i++ { select { case numbers <- i: case <-ctx.Done(): return } } }() // Fan-out to 5 workers workers := fanOut(ctx, numbers, 5) // Fan-in results results := fanIn(ctx, workers...) // Process results for result := range results { fmt.Printf("Result: %d\n", result) } } Канал: Грађани прве класе Управљачки канали // BAD: bidirectional channel everywhere func producer(ch chan int) { ch <- 42 } func consumer(ch chan int) { value := <-ch } // GOOD: restrict direction func producer(ch chan<- int) { // send-only ch <- 42 } func consumer(ch <-chan int) { // receive-only value := <-ch } // Compiler will check correct usage func main() { ch := make(chan int) go producer(ch) go consumer(ch) } Избор и не-блокирање операција // Pattern: timeout with select func RequestWithTimeout(url string, timeout time.Duration) ([]byte, error) { result := make(chan []byte, 1) errCh := make(chan error, 1) go func() { resp, err := http.Get(url) if err != nil { errCh <- err return } defer resp.Body.Close() data, err := io.ReadAll(resp.Body) if err != nil { errCh <- err return } result <- data }() select { case data := <-result: return data, nil case err := <-errCh: return nil, err case <-time.After(timeout): return nil, fmt.Errorf("request timeout after %v", timeout) } } // Non-blocking send func TrySend(ch chan<- int, value int) bool { select { case ch <- value: return true default: return false // channel full } } // Non-blocking receive func TryReceive(ch <-chan int) (int, bool) { select { case value := <-ch: return value, true default: return 0, false // channel empty } } Услови трке и како их избегавати Ključna reč: data race // DANGEROUS: data race type Counter struct { value int } func (c *Counter) Inc() { c.value++ // NOT atomic! } func (c *Counter) Value() int { return c.value // race on read } // Check: go test -race Решење 1: Мутекс type SafeCounter struct { mu sync.RWMutex value int } func (c *SafeCounter) Inc() { c.mu.Lock() defer c.mu.Unlock() c.value++ } func (c *SafeCounter) Value() int { c.mu.RLock() defer c.mu.RUnlock() return c.value } // Pattern: protecting invariants type BankAccount struct { mu sync.Mutex balance decimal.Decimal } func (a *BankAccount) Transfer(to *BankAccount, amount decimal.Decimal) error { // Important: always lock in same order (by ID) // to avoid deadlock if a.ID() < to.ID() { a.mu.Lock() defer a.mu.Unlock() to.mu.Lock() defer to.mu.Unlock() } else { to.mu.Lock() defer to.mu.Unlock() a.mu.Lock() defer a.mu.Unlock() } if a.balance.LessThan(amount) { return ErrInsufficientFunds } a.balance = a.balance.Sub(amount) to.balance = to.balance.Add(amount) return nil } Решење 2: Канали за синхронизацију // Use channels instead of mutexes type ChannelCounter struct { ch chan countOp } type countOp struct { delta int resp chan int } func NewChannelCounter() *ChannelCounter { c := &ChannelCounter{ ch: make(chan countOp), } go c.run() return c } func (c *ChannelCounter) run() { value := 0 for op := range c.ch { value += op.delta if op.resp != nil { op.resp <- value } } } func (c *ChannelCounter) Inc() { c.ch <- countOp{delta: 1} } func (c *ChannelCounter) Value() int { resp := make(chan int) c.ch <- countOp{resp: resp} return <-resp } Конкурентни модели Оригинални наслов: Graceful Shutdown type Server struct { server *http.Server shutdown chan struct{} done chan struct{} } func NewServer(addr string) *Server { return &Server{ server: &http.Server{ Addr: addr, }, shutdown: make(chan struct{}), done: make(chan struct{}), } } func (s *Server) Start() { go func() { defer close(s.done) if err := s.server.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Printf("Server error: %v", err) } }() // Wait for shutdown signal go func() { sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) select { case <-sigCh: case <-s.shutdown: } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if err := s.server.Shutdown(ctx); err != nil { log.Printf("Shutdown error: %v", err) } }() } func (s *Server) Stop() { close(s.shutdown) <-s.done } Модел: Ограничење стопе type RateLimiter struct { rate int bucket chan struct{} stop chan struct{} } func NewRateLimiter(rate int) *RateLimiter { rl := &RateLimiter{ rate: rate, bucket: make(chan struct{}, rate), stop: make(chan struct{}), } // Fill bucket for i := 0; i < rate; i++ { rl.bucket <- struct{}{} } // Refill bucket at given rate go func() { ticker := time.NewTicker(time.Second / time.Duration(rate)) defer ticker.Stop() for { select { case <-ticker.C: select { case rl.bucket <- struct{}{}: default: // bucket full } case <-rl.stop: return } } }() return rl } func (rl *RateLimiter) Allow() bool { select { case <-rl.bucket: return true default: return false } } func (rl *RateLimiter) Wait(ctx context.Context) error { select { case <-rl.bucket: return nil case <-ctx.Done(): return ctx.Err() } } Модел: цевовод са управљањем грешкама // Pipeline stage with error handling type Stage func(context.Context, <-chan int) (<-chan int, <-chan error) // Compose stages func Pipeline(ctx context.Context, stages ...Stage) (<-chan int, <-chan error) { var ( dataOut = make(chan int) errOut = make(chan error) dataIn <-chan int errIn <-chan error ) // Start generator start := make(chan int) go func() { defer close(start) for i := 1; i <= 100; i++ { select { case start <- i: case <-ctx.Done(): return } } }() dataIn = start // Apply stages for _, stage := range stages { dataIn, errIn = stage(ctx, dataIn) // Collect errors go func(errors <-chan error) { for err := range errors { select { case errOut <- err: case <-ctx.Done(): return } } }(errIn) } // Final output go func() { defer close(dataOut) for val := range dataIn { select { case dataOut <- val: case <-ctx.Done(): return } } }() return dataOut, errOut } Практични типови Uvek koristite kontekst za duge operacije Не рађајте гороутине неконтролисано - користите радничке базене Преферирајте канале за мутекс за координацију Користите синц / атомски за једноставне бројаче Тестирање са -race flag Ограничење правца канала Увек размишљајте о грациозном затварању Конкурентна контролна листа Контекст прошао као први параметар Goroutine se mogu zaustaviti kroz kontekst Нема сиротињских горутина (утицаји) Канали затворени по емитеру Мутекс закључана у истом редоследу Тестови пролазе са -race flag Ima graciozno zatvaranje Радни базен за велике операције Закључак Конкуренција у Го-у није само карактеристика, то је филозофија језика. Правилна употреба гороутина, канала и контекста омогућава писање елегантног истовременог кода без традиционалних проблема мултитреадинг-а. Овај чланак завршава серију "Чист код у Гоу".Покрили смо пут од функција до истовремености, додирујући све кључне аспекте писања идиоматског Гоо кода. Запамтите: Гоо је о једноставности, а чист код у Гоу је код који следи идиоме језика. Који је ваш најгори инцидент у производњи узрокован условима трке? Како тестирате истовремени код? Који обрасци су вас спасили од цурења горутина? Поделите своје ратне приче у коментарима!